Add the client-side subscriptions/listen driver#3047
Conversation
📚 Documentation preview
|
There was a problem hiding this comment.
3 issues found across 23 files
Reply with feedback, questions, or to request a fix.
Fix all with cubic | Re-trigger cubic
There was a problem hiding this comment.
2 issues found across 16 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="docs/advanced/subscriptions.md">
<violation number="1" location="docs/advanced/subscriptions.md:91">
P2: The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.</violation>
</file>
Reply with feedback, questions, or to request a fix.
Fix all with cubic | Re-trigger cubic
| ``` | ||
|
|
||
| * `client.listen(...)` takes the filter as keyword arguments — they mirror the wire `SubscriptionFilter` field for field. Entering sends the request and returns once the server's acknowledgment arrives, so `sub.honored` (the subset the server agreed to deliver) is always there before the first event. | ||
| * Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one. |
There was a problem hiding this comment.
P2: The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/advanced/subscriptions.md, line 91:
<comment>The client docs now give conflicting filter semantics: this line says a subscription may receive sub-resource URIs, while the same page says resource subscriptions match exact URI strings. Consider keeping the event description aligned with exact-match delivery so clients do not build watchers that expect broader notifications.</comment>
<file context>
@@ -88,9 +88,9 @@ Consuming a subscription is one context manager:
* `client.listen(...)` takes the filter as keyword arguments — they mirror the wire `SubscriptionFilter` field for field. Entering sends the request and returns once the server's acknowledgment arrives, so `sub.honored` (the subset the server agreed to deliver) is always there before the first event.
-* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)`. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
+* Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one.
* Leaving the block ends the subscription, with the transport's own spelling: over streamable HTTP the request's response stream is closed (that is the 2026 cancellation signal), on stream transports `notifications/cancelled` is sent.
-* The stream's two endings are control flow. The server closing gracefully simply ends the `async for`; an abrupt drop raises `SubscriptionLost`. There is no replay and no automatic re-listen — a client that reconnects refetches what it depends on:
</file context>
| * Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)` — where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one. | |
| * Iteration yields the same four typed events the server publishes: `ToolsListChanged`, `PromptsListChanged`, `ResourcesListChanged`, and `ResourceUpdated(uri=...)`. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one. |
client.listen() opens one subscription as an async context manager: entering sends the request and waits for the server's acknowledgment (the honored filter subset is on the handle before the first event), iteration yields the same four typed events the server publishes, a graceful server close simply ends the loop, and an abrupt drop raises SubscriptionLost. Exiting the context ends the subscription with the transport's own cancellation spelling (aborting the request's stream over streamable HTTP, notifications/cancelled on stream transports). There is no replay and no automatic re-listen. Pending events deduplicate - every kind is a level trigger - so the backlog is bounded by the filter's width by construction. The event vocabulary moves to mcp/shared/subscriptions.py (re-exported from the server module) so both sides speak the same types. The session demultiplexes stream frames by the _meta subscription id: acks in the driver's id namespace are consumed (raw escape-hatch listens keep observing theirs through message_handler), change events are delivered after the message_handler tee so cache eviction always completes before an event-triggered refetch can run, and session teardown settles every open route so a watcher task in a sibling task group cannot hang when the client closes. A per-request SSE stream that drops without ever carrying an event id now resolves its request with CONNECTION_CLOSED instead of leaving it pending for the session's lifetime. The legacy subscribe_resource/unsubscribe_resource verbs are deprecated on both Client and ClientSession with a pointer at the replacement.
The docs-example checker pipes snippets to ruff in the platform encoding; an em-dash in a code comment arrives as invalid UTF-8 on Windows runners.
Move the client's listen-stream demux to a synchronous notification intercept on the dispatcher's receive path, where cancelled/progress interception already lives: acks, events, and teardown signals now advance in wire order relative to the listen result, so a graceful close can no longer outrun the events that preceded it or clobber the acknowledged filter with the fabricated empty one. Ack consumption keys on the live route registry alone (the per-session id set is gone), route admission enforces the honored filter - loose on URIs, which the spec lets be sub-resources, with a capped backlog settling the route lost against floods - and the response-cache eviction barrier moved to the consumption point, held pending until it completes so a cancelled consumer cannot lose a level trigger. On the transport, a request whose SSE stream can never answer - a non-resumable drop or an exhausted reconnection budget - now resolves with a synthesized error, contained against teardown races. Docs: a graceful close is not "stop watching" (servers shed load by closing gracefully, including this SDK's ListenHandler); the watch loop re-listens on both endings, and sub.honored is the delivery contract, not catalog state.
Comment-only pass over the branch's additions: keep the non-inferable invariants and motivations (receive-order rationale, sub-resource admission, the wake-snapshot race, peek/commit semantics) at one to three lines each, tighten docstrings to Google style with Raises sections kept, and drop narration the code already states. No code changes.
469fdb2 to
69e6cd3
Compare
4c95899 to
6573774
Compare
There was a problem hiding this comment.
The regressions I flagged earlier (the wire-order delivery race, the exhausted-reconnection hang) look properly resolved in the follow-up commits, and I found no new issues on this revision — but this is a substantial new public API (the listen driver, a Dispatcher protocol change, transport behavior changes), so it still warrants a human maintainer's review.
Extended reasoning...
Overview
This PR adds the client half of subscriptions/listen (SEP-2575, 2026-07-28): a new mcp/client/subscriptions.py driver exposed as Client.listen(), a shared event vocabulary in mcp/shared/subscriptions.py, a new synchronous on_notify_intercept hook on the Dispatcher protocol (implemented in both JSONRPCDispatcher and DirectDispatcher), streamable-HTTP changes so abandoned per-request SSE streams resolve their waiter with CONNECTION_CLOSED, deprecation of the legacy subscribe/unsubscribe verbs, plus docs, examples, and a large test surface (~30 files).
Prior findings and their resolution
My earlier review flagged three issues; all were addressed in follow-up commits. The delivery-vs-result ordering race was fixed at the cause by moving listen-route bookkeeping onto a synchronous receive-path intercept (with a regression test wiring the exact [ack, event, result] burst against a parked message_handler). The reconnection give-up path now resolves the original request via _resolve_abandoned_request, covering the hang class on the id-bearing branch too. The unbounded _driver_listen_ids set was removed entirely, with ack consumption keyed on the live route registry. I re-checked the current code for these and found no new issues introduced by the fixes.
Security risks
Low. No auth, crypto, or permission logic is touched. The client-side route admission filters events against the acknowledged filter, and a hostile peer flooding distinct URIs is bounded by the _MAX_PENDING_EVENTS backstop (settling the subscription lost rather than growing memory). The intercept hook contains exceptions so a raising intercept cannot take down the receive loop.
Level of scrutiny
High. This changes the Dispatcher protocol signature (a cross-cutting seam every dispatcher implementation and several test doubles had to follow), adds a new public API surface with documented semantics (graceful close vs SubscriptionLost, honored-filter contract), and modifies the streamable-HTTP transport's reconnection/abandonment behavior — all production-critical client paths. This is exactly the kind of design-bearing change a human maintainer should sign off on, regardless of test coverage.
Other factors
Test coverage is thorough (24 driver tests, dispatcher-contract tests for the new intercept, interaction-matrix tests, tested docs snippets), coverage is at 100%, and the PR description explicitly flags the conformance-suite gap as a deliberate acceptance. The remaining unresolved cubic comment about sub-resource URI wording in the docs is minor. Given the scope and the protocol-level design decisions involved, I am deferring rather than shadow-approving.
Stacked on #3046 (the request-id and cancellation plumbing) — review this one for the driver only; the base branch carries the transport work.
The client half of
subscriptions/listen(2026-07-28, SEP-2575), shaped after the ecosystem's own streaming idioms: one context manager,async forconsumption, the acknowledgment as state on the handle rather than an event in the stream.Motivation and Context
On the 2026 wire the response to
subscriptions/listenis the notification stream. The server half shipped in #3035; until now a client had to hand-roll amessage_handler, park a rawsend_requestin a task, and demultiplex_metasubscription ids by hand (see the oldexamples/stories/subscriptions/client.pyfor what that looked like). The driver folds all of it into structured concurrency:sub.honored— the filter subset the server agreed to deliver — is always populated, making the spec's "client SHOULD check the acknowledged filter" a plain attribute read. Pre-ack failures raise from enter (server rejection asMCPError, a stream that dies unacknowledged asSubscriptionLost); nothing degrades silently.mcp/shared/subscriptions.pyand re-exported from the server module (no import breaks). Events are level triggers with no payload beyond identity, so unconsumed duplicates deduplicate and the backlog is bounded by the filter's width by construction — no cap, no overflow policy.async for; a drop raisesSubscriptionLost. No replay, no automatic re-listen — a reconnecting client refetches, which the docs show as a four-line loop.notifications/cancelledis sent on stream transports. The driver itself is transport-blind.Client.listen()on a pre-2026 connection raises a typedListenNotSupportedErrorsteering to the legacy verbs (TypeScript SDK parity), andsubscribe_resource/unsubscribe_resourcenow carry deprecation warnings on bothClientandClientSession, with a migration-guide entry.Session-level mechanics worth a reviewer's attention: acks are consumed only for ids in the driver's minted namespace, so raw escape-hatch listens (driving
send_requestdirectly) still observe their acks throughmessage_handler; delivered events are handed to the consumer after themessage_handlertee, so the caching layer's eviction always completes before an event-triggered refetch can run; and session teardown settles every open route, so a watcher task in a sibling task group getsSubscriptionLostinstead of hanging when the client closes. One transport fix rides along: a per-request SSE stream that drops without ever carrying an event id now resolves its request withCONNECTION_CLOSEDinstead of leaving the waiter pending for the session's lifetime.How Has This Been Tested?
Client: honored filter round-trip, both event kinds, exact-URI filter silence, exit-frees-slot proven by a second listen on the same session, refetch-after-change, and the legacy steer.Breaking Changes
None. The event types moved to
mcp/shared/subscriptions.pybut remain importable frommcp.server.subscriptions; the deprecated subscription verbs keep working against 2025-era servers.Types of changes
Checklist
Additional context
The conformance suite has no client-role listen scenarios — the upstream SEP traceability file explicitly excludes the two client-side obligations (subscription-id correlation is client-internal demux, not wire-observable; the ack check has no wire-observable definition) and the stdio client harness is tracked separately upstream. Coverage here is therefore the interaction suite plus TypeScript-SDK behavioral parity; flagging per the repo's conformance rule so the exclusion is a deliberate acceptance, not an oversight.
Deliberate omissions, for the record: no callbacks layer (the TypeScript SDK routes listen events into its notification-handler registrations; here
message_handlerstill sees every teed frame, so a handler-style consumer remains constructible), no auto-open-at-connect, no auto-re-listen, and noclose()method — the context manager is the lifecycle.